package com.tca.common.learning.webflux.reactor.sink;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author zhoua
 * @date 2022/1/10 11:03
 */
public class BackPressureTest {

    private static CreateTest.MyEventSource eventSource = new CreateTest.MyEventSource();

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static Subscriber subscriber = new BaseSubscriber() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            request(1);
        }

//        @Override
        protected void hookOnNext(CreateTest.MyEvent event) {
            System.out.println("                      receive <<< " + event.getMessage());
            try {
                TimeUnit.MILLISECONDS.sleep(3000);
            } catch (InterruptedException e) {
            }
            // 每处理完1个数据，就再请求1个
            request(1);
        }

        @Override
        protected void hookOnError(Throwable throwable) {
            System.err.println("                      receive <<< " + throwable);
        }

        @Override
        protected void hookOnComplete() {
            countDownLatch.countDown();
        }
    };

    /**
     * 创建Flux
     * @param strategy
     * @return
     */
    private static Flux<CreateTest.MyEvent> createFlux(FluxSink.OverflowStrategy strategy) {
        return Flux.create(sink -> {
            eventSource.register(new CreateTest.MyEventListener() {
                @Override
                public void onNewEvent(CreateTest.MyEvent event) {
                    System.out.println("publish >>>>>>>>> " + event.getMessage());
                    sink.next(event);
                }

                @Override
                public void onEventStopped() {
                    sink.complete();
                }
            });
        }, strategy);
    }

    /**
     * 产生事件
     * @param count 数量
     * @param millis 毫秒数
     */
    private static void generateEvent(int count, long millis) {
        for (int i = 0; i < count; i++) {
            try {
                TimeUnit.MILLISECONDS.sleep(millis);
            } catch (InterruptedException e) {
            }
            eventSource.newEvent(new CreateTest.MyEvent(new Date(), "Event-" + i));
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Flux<CreateTest.MyEvent> publisher = createFlux(FluxSink.OverflowStrategy.BUFFER)
                .doOnRequest(n -> System.out.println(" ======> request: " + n + " ========"))
                .publishOn(Schedulers.newSingle("single"), 1);

        publisher.subscribe(subscriber);
        generateEvent(10, 500);
        countDownLatch.await();
    }
}
